dbt-athenaでAudit_helperのテーブル監査支援機能を試してみた
データアナリティクス事業本部 機械学習チームの鈴木です。
dbtのAudit_helperを試してみたので、簡単にですがご紹介します。
今回はdbt-athena-communityで使うことを想定して動作を確認しました。
Audit_helperについて
モデルをリファクタリングする際などに、元のモデルと同じ出力を提供するか確認するテーブル監査プロセスを支援するためのPackageです。
以下のブログ記事が大変参考になります。
ブログ記事では非dbtの仕組みで作成していたテーブルとdbtモデルにより作成されるテーブルを比較することを例としていますが、例えばマートのモデルのロジック修正によって期待値を格納したテーブルと差分が出ないかのような回帰テストにも利用できないかと思い、今回調べてみました。
非常に便利にテーブル差分の要約情報を取得できるPackageですが、記事執筆時点ではレコードの重複があるケースで、dbt-athena-communityで使う分には挙動を知っておくべき点があったので後述します。
Audit_helperのインストール
Packageの追加については以下のブログを参考に進めました。
dbtプロジェクトのルート直下に以下のファイルを作成しました。
packages: - package: dbt-labs/audit_helper version: 0.9.0
dbtプロジェクトでdbt deps
コマンドを実行することで、Packageをインストールしました。
dbt deps
なお、dbt Coreおよびdbt-athena-communityのバージョンは以下です。
- dbt Core: 1.5.6
- dbt-athena-community: 1.5.1
compare_queriesマクロを試す
いくつかのマクロが利用できるようになりますが、今回はそのうちのcompare_queries
マクロを試しました。
1. モデルの作成
以下のようにモデルを作成しました。
models ├── audit │ └── audit_sample.sql ├── mart │ └── iris_new.sql └── staging └── sample__sources.yml
順番に確認していきます。
sample__sources.yml
S3バケットに格納したIrisデータセットを読み出すGlueテーブルを参照するソーステーブルです。
※ S3バケットへのデータの配置とGlueテーブルの作成はすでにされているものとします。
version: 2 sources: - name: iris_raw database: awsdatacatalog schema: cm-nayuts-dbt-athena tables: - name: iris
なお、データセットは、下記リンクにて公開されています。
https://archive.ics.uci.edu/ml/datasets/iris
種別ごとの件数は以下のように50件ずつ、合わせて150件のレコードがあります。
レコードの重複を加味すると以下の件数になりました。
iris_new.sql
Irisデータセットを加工したマートです。
SELECT class, sepal_length, sepal_width, petal_length, petal_width FROM {{ source('iris_raw','iris') }} WHERE class <> 'Iris-setosa' UNION ALL SELECT 'Iris-setosa' AS class, CAST(5.2 AS real) AS sepal_length, CAST(3.3 AS real) AS sepal_width, CAST(1.5 AS real) AS petal_length, CAST(0.3 AS real) AS petal_width ;
処理としては、Iris-setosa
の種別のデータを削除し、1件の別のデータに差し替えました。あまりこのようなマート作成処理は書かないようには思いますが、後で見るようにAudit_heplerの出力が見やすいのでこのようにしておきました。
今回はこのマートとデータソースの差分をAudit_heplerを使って確認します。
audit_sample.sql
Audit_heplerが提供する機能のcompare_queries
マクロを含む、監査用のモデルです。
{# in dbt Develop #} {% set old_fct_orders_query %} select sepal_length, sepal_width, petal_length, petal_width, class from {{ref('iris_new')}} {% endset %} {% set new_fct_orders_query %} select sepal_length, sepal_width, petal_length, petal_width, class from {{ source('iris_raw','iris') }} {% endset %} {{ audit_helper.compare_queries( a_query=old_fct_orders_query, b_query=new_fct_orders_query ) }}
マートとデータソースの中身をそれぞれクエリA, Bとして比較しました。compare_queries
マクロにはprimary_key
が指定できますが、今回は該当するキーがないので指定しませんでした。レポジトリのマクロの記載によると、compare_relations
マクロと非常に類似しているとのことで、こちらのマクロではオプションの項目だったので省略可と判断しています。
2. モデルの実行
dbt run
コマンドでモデルを実行しました。
dbt run # 05:08:59 Running with dbt=1.5.6 # 05:08:59 Registered adapter: athena=1.5.1 # 05:08:59 Found 2 models, 0 tests, 0 snapshots, 0 analyses, 467 macros, 0 operations, 0 seed files, 1 source, 0 exposures, 0 metrics, 0 groups # 05:08:59 # 05:09:03 Concurrency: 1 threads (target='dev') # 05:09:03 # 05:09:03 1 of 2 START sql table model cm-nayuts-dbt-athena.iris_new ..................... [RUN] # 05:09:08 1 of 2 OK created sql table model cm-nayuts-dbt-athena.iris_new ................ [OK 101 in 5.12s] # 05:09:08 2 of 2 START sql table model cm-nayuts-dbt-athena.audit_sample ................. [RUN] # 05:09:13 2 of 2 OK created sql table model cm-nayuts-dbt-athena.audit_sample ............ [OK 3 in 5.12s] # 05:09:13 # 05:09:13 Finished running 2 table models in 0 hours 0 minutes and 13.86 seconds (13.86s). # 05:09:13 # 05:09:13 Completed successfully # 05:09:13 # 05:09:13 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
audit_sample
テーブルが作成されているので中身を確認しました。
in_a
はマート、in_b
はソーステーブルからのデータになります。それぞれにあるデータかどうかで分けて件数と割合を出力してくれています。
結果は想定とは若干ズレがあり、Athenaエンジンv3のexcept
の仕様に由来するものだと思われました。重複するレコードは1件分となっているようです。dbt-athena-communityプラグインを使った場合に、重複したレコード件数は1件にまとめない結果の出力を求めている場合は注意した方がよさそうです。PKがあるデータに対してであれば問題ないと推測されます。
参考までに以下が実行されたモデル作成のSQLになります。
-- /* {"app": "dbt", "dbt_version": "1.5.6", "profile_name": "sample_project", "target_name": "dev", "node_id": "model.sample_project.audit_sample"} */ create table "awsdatacatalog"."cm-nayuts-dbt-athena"."audit_sample" with ( table_type='hive', is_external=true,external_location='s3://データを入れたバケット名/data/cm-nayuts-dbt-athena/audit_sample/eb5177c1-abbe-40ec-9f49-4f6d6a21fa9b', format='parquet' ) as with a as ( select sepal_length, sepal_width, petal_length, petal_width, class from "awsdatacatalog"."cm-nayuts-dbt-athena"."iris_new" ), b as ( select sepal_length, sepal_width, petal_length, petal_width, class from "awsdatacatalog"."cm-nayuts-dbt-athena"."iris" ), a_intersect_b as ( select * from a intersect select * from b ), a_except_b as ( select * from a except select * from b ), b_except_a as ( select * from b except select * from a ), all_records as ( select *, true as in_a, true as in_b from a_intersect_b union all select *, true as in_a, false as in_b from a_except_b union all select *, false as in_a, true as in_b from b_except_a ), summary_stats as ( select in_a, in_b, count(*) as count from all_records group by 1, 2 ), final as ( select *, round(100.0 * count / sum(count) over (), 2) as percent_of_total from summary_stats order by in_a desc, in_b desc ) select * from final
該当のマクロのソースは以下になります。
https://github.com/dbt-labs/dbt-audit-helper/blob/main/macros/compare_queries.sql
終わりに
dbt-athena-communityプラグインでAudit_helperを試してみたご紹介でした。
参考になりましたら幸いです。